
###############################
## 4. RUN MATCH
###############################

### rstudio restart will be embedded in the function itself, which will overwrite
# match index with progress and completion status that will then be sourced
# back into the function which also sources itself after restarting R. The function 
# will stop sourcing back to itself once the status log marks all match iterations
# as complete.


match_cluster <- function(i=1,match.groups=match.groups,
                          cluster.range=NULL,
                          voter.sub=NULL,
                          mto.sub=NULL,
                          temp.folder=temp.folder){
  
  
  # define variables for matching (will remove those with no variation in a moment)
  vars <- c('first','last','middle','suffix','birthday','birthmonth','birthyear') # I removed gender here because "unknown" is a missing data problem
  
  #### KEY: i = cluster within match group (e.g. 83 in small cluster male match)
              # this will be restored iteratively at each re-sourcing of the function
              # code
          # match.groups = dataframe of possible match groups (small or main version
              # of clusters, by gender) and status of match completion 
          # cluster.range =  c(1, # of clusters) min and max of possible cluster identifiers
  
  
  # determine where we are in the match process
  NAindex <- which(is.na(match.groups$status)) # identify first group that status=NA
  
  if(length(NAindex)==0){
    cat("all matches done")
    rstudioapi::restartSession(command="source('5_reattach_data.R')") 
    
  }
  assign("group", value=as.character(match.groups$group[min(NAindex)]), envir=.GlobalEnv)
  
  assign("gender", value=match.groups$gender[match.groups$group==group], envir = .GlobalEnv)
  assign("size", value=match.groups$size[match.groups$group==group], envir = .GlobalEnv)

  
  if(match.groups$size[match.groups$group==group]=="small_clusters"){
    save.folder <- paste0(temp.folder,"/small_match/")
  }else{
    save.folder <- paste0(temp.folder,"/main_match/")
  }

  
  if(is.null(mto.sub) & group %in% c("small.match.male","small.match.female")){
    filename <- paste0(temp.folder,size,
                       '/small_mto_clusters_',
                       gender,'.RData')
    load(filename)
    
    #dataframe for now
    mto.sub <- data.frame(mto.sub)
    
    # make sure everything is lowercase
    mto.sub[,names(mto.sub) %in% vars] <- sapply(mto.sub[,names(mto.sub) %in% vars], function(x) tolower(x)) 
  }
  
  if(is.null(mto.sub) & group %in% c("main.match.male","main.match.female")){
    filename <- paste0(temp.folder,
           'mto_clusters_',
           gender
           ,'.RData')
    
    load(filename)
    
    #dataframe for now
    mto.sub <- data.frame(mto.sub)
    #cat(class(mto.sub))
    # make sure everything is lowercase
    mto.sub[,names(mto.sub) %in% vars] <- sapply(mto.sub[,names(mto.sub) %in% vars], function(x) tolower(x)) 
 
  }

  if(group=="small.match.male"){
    
   
    if(i==1){
      
      i <- 83
      new.i <- 83
      assign("i",value=new.i, envir=.GlobalEnv)
      
      
    }
    assign("cluster.range", value=c(83,84), envir = .GlobalEnv)
  }else{
    assign("cluster.range",value=c(1,max(mto.sub$cluster)), envir = .GlobalEnv)
  }
  
  cat(group,i,"\n")
  
  # load l2 cluster
  if(group %in% c("small.match.male","small.match.female")){
    
    
    filename <- paste0(temp.folder,"small_clusters/small_",paste('l2',
                                                                 gender,
                                                                 i,sep='_'),'.RData')
   # cat(filename,"\n")
    #cat(group,"\n")
    load(file=filename)
   
    
  }else{ 
    filename <- paste0(temp.folder,paste('l2',gender,
                                         i,sep='_'),'.RData')
    load(file=filename,envir=.GlobalEnv)
      
    }
  
 
  # subset mto data, make sure everything's in data.table
  mto.cluster <- data.table(subset(mto.sub, cluster == i))
  
  # fastLink requires variation in variables we're matching on, so restrict match vars to those with variation
  variation.l2 <- sapply(voter.sub[, ..vars],function(x) length(unique(x)))
  variation.mto <- sapply(mto.cluster[, ..vars], function(x) length(unique(x)))
  drop <- c(variation.l2[which(variation.l2==1)],variation.mto[which(variation.mto==1)])
  match.vars <- vars[!(vars %in% names(drop))]
  #print(match.vars)
  
  # run matcher: this returns partial matches for string variables (first/last name) with default cut-offs
  x <- "good to go" # these are placeholders to let us know how things are proceeding
  y <- "good to go"
  
  # catch error that occurs when data should only be processed on one core instead of 
    # parallelized over many
  tryCatch({
    matches.out <- fastLink(
      dfA = mto.cluster, dfB = voter.sub, 
      varnames = match.vars,
      stringdist.match = c("first","last")[c("first","last") %in% match.vars],
      partial.match=c("first","last")[c("first","last") %in% match.vars],
      threshold.match = 0.75,
      dedupe.matches = F,
      n.cores=4 # NOTE: can be adjusted if more/fewer cores are available over which to parallelize, in order to speed up match process   
    )}, error=
      function(e){cat("ERROR :",conditionMessage(e),"\n")
        x <<-"Move to one core"}, finally={
          cat(x,"\n")})
  
  if(x=="Move to one core"){
    cat("Trying one core\n")
    tryCatch({
      matches.out <- fastLink(
        dfA = mto.cluster, dfB = voter.sub, 
        varnames = match.vars,
        stringdist.match = c("first","last")[c("first","last") %in% match.vars],
        partial.match=c("first","last")[c("first","last") %in% match.vars],
        threshold.match = 0.75,
        dedupe.matches = F,
        n.cores=1 # NOTE: can be adjusted if more/fewer cores are available over which to parallelize, in order to speed up match process   
      )}, error=
        function(e){cat("ERROR :",conditionMessage(e),"\n")
          y <<-"Failure"}, finally={
            cat(y,"\n")})
    
    if(y=="Failure"){
      cat(i, "Failed for both\n")
     
    }
  }
  
  if(y=="Failure"){
    cat("Moving to the next one\n")
    
    new.i <- i+1
    
    if(new.i>max(cluster.range)){
      
      assign("mto.sub",value=NULL, envir=.GlobalEnv)
      assign("cluster.range",value=NULL, envir=.GlobalEnv)
      assign("i",value=1, envir=.GlobalEnv) 
      
      match.groups$status[match.groups$group==group] <- "done"
      assign("match.groups", value=match.groups, envir=.GlobalEnv)
      rstudioapi::restartSession(command="match_cluster(i=i,temp.folder=temp.folder,match.groups=match.groups,voter.sub=voter.sub,mto.sub=mto.sub,cluster.range=cluster.range)") 
      
    }else{
      assign("i",value=new.i,envir = .GlobalEnv)
      # return(i)
      
      rstudioapi::restartSession(command="match_cluster(i=i,temp.folder=temp.folder,match.groups=match.groups,voter.sub=voter.sub,mto.sub=mto.sub,cluster.range=cluster.range)") 
    }
  }else{
    cat(i, 'matcher run\n')

    
    # subset dataframes to matches
    matched <- getMatches(mto.cluster, 
                          voter.sub,
                          matches.out,
                          combine.dfs=F,
                          threshold.match=.75) # returns list with two data frames with matches from each dataset
    
    # combine into one data.frame
    matched.subset <- data.frame(do.call(cbind,matched))
    
    
    # identify participants that failed to be matched
    unmatched.subset <- mto.cluster[!(id %in% matched.subset$dfA.match.id)]
    #cat(nrow(unmatched.subset),"\n")
    #cat(class(unmatched.subset),"\n")
    
    # remove variable duplicates and revert to old names for ease + clean up
    names(matched.subset) <- c(gsub(paste(c('dfA.match.'),collapse='|'),'mto.',names(matched.subset)[grepl('dfA.match.',names(matched.subset))]),
                               gsub(paste(c('dfB.match.'), collapse='|'),'',names(matched.subset)[grepl('dfB.match.',names(matched.subset))]))
    matched.subset <- matched.subset[,!names(matched.subset) %in% paste0("mto.",c("gamma.1","gamma.2","gamma.3","gamma.4","gamma.5","gamma.6","gamma.7","posterior"))]
    #print(names(matched.subset))
    
    names(unmatched.subset) <- paste("mto",names(unmatched.subset),sep=".")
    
    # recode gender
    matched.subset$gender[matched.subset$gender %in% c('<NA>',NA)] <- ''
    
    # drop observations where the algorithm failed to converge
    if(sum(names(matched.subset) %in% "posterior")>0){
      matched.subset  <- matched.subset[!is.na(matched.subset$posterior),]
    }
    
    matched.subset <- data.table(matched.subset)
    
    # aggregate matches, failed matches, and match summary statistics
    #clusters.matched[[id]] <- matched.subset
    save(matched.subset,file=paste0(save.folder,match.groups$gender[match.groups$group==group],"_",i,"_matched",".RData"))
    
    # clusters.unmatched[[id]] <- unmatched.subset
    save(unmatched.subset,file=paste0(save.folder,match.groups$gender[match.groups$group==group],"_",i,"_unmatched",".RData"))
    
    # clusters.EM[[id]] <- matches.out$EM
    em.object <- matches.out$EM
    if(is.null(em.object)){
      rm(list='voter.sub','matches.out','matched')
      cat(id,'done\n')
      gc()
      next    
    }
    
    save(em.object, file=paste0(save.folder,match.groups$gender[match.groups$group==group],"_",i,"_EM_object",".RData"))
    
    cat("done\n")
    
    new.i <- i+1
    
    if(new.i>max(cluster.range)){
      
      assign("mto.sub",value=NULL, envir=.GlobalEnv)
      assign("cluster.range",value=NULL, envir=.GlobalEnv)
      assign("i",value=1, envir=.GlobalEnv) 
      
      match.groups$status[match.groups$group==group] <- "done"
      assign("match.groups", value=match.groups, envir=.GlobalEnv)
      
      rstudioapi::restartSession(command="match_cluster(i=i,temp.folder=temp.folder,match.groups=match.groups,voter.sub=voter.sub,mto.sub=mto.sub,cluster.range=cluster.range)") 
      
      
    }else{
      assign("i",value=new.i,envir = .GlobalEnv)
      # return(i)
      
      rstudioapi::restartSession(command="match_cluster(i=i,temp.folder=temp.folder,match.groups=match.groups,voter.sub=voter.sub,mto.sub=mto.sub,cluster.range=cluster.range)") 
    }
    
  }
  
 
} 
  
  

